package nodex

import (
	"encoding/json"
	"fmt"
	"gitee.com/zhongguo168a/go-nodex/logx"
	"gitee.com/zhongguo168a/go-nodex/nodex/mq"
	"gitee.com/zhongguo168a/gocodes/datax"
	"gitee.com/zhongguo168a/gocodes/myx/errorx"
	"gitee.com/zhongguo168a/gocodes/myx/eventx"
	"github.com/nsqio/go-nsq"
)

func (n *Node) initMQ() {
	if n.config.MsgQueue == nil {
		return
	}
	mqconfig := n.config.MsgQueue.(*NsqMsgQueue)
	if mqconfig.ProducerAddr == "" || mqconfig.LookupdAddr == "" {
		return
	}

	n.initMQUpdateConfig()

}
func (n *Node) initMQUpdateConfig() {
	mqconfig := n.config.MsgQueue.(*NsqMsgQueue)
	_, err := mq.CreateConsumer(mqconfig.LookupdAddr, "node_update_config", GetIdent(), nsq.NewConfig(), nsq.HandlerFunc(func(message *nsq.Message) error {
		return nil
	}))
	if err != nil {
		return
	}

}

func (n *Node) RegisterEvent(eventName string) error {
	nodeEventName := "node_" + eventName
	mqconfig := n.config.MsgQueue.(*NsqMsgQueue)
	config := nsq.NewConfig()
	_, err := mq.CreateConsumer(mqconfig.LookupdAddr, nodeEventName, GetIdent(), config, nsq.HandlerFunc(func(message *nsq.Message) error {
		body := string(message.Body)

		m := map[string]interface{}{}
		jsonerr := json.Unmarshal([]byte(body), &m)
		if jsonerr != nil {
			logx.Error(errorx.Wrap(jsonerr, "json.Unmarshal", datax.M{"event": eventName, "params": body}))
			return nil
		}

		eventerr := eventx.GetEvent(nodeEventName).Dispatch(m)
		if eventerr != nil {
			logx.Error(errorx.Wrap(eventerr, "dispatch event", datax.M{"event": eventName, "params": body}))
			return nil
		}
		return nil
	}))
	if err != nil {
		return errorx.Wrap(err, "create consumer")
	}

	return nil
}

func (n *Node) GetEventDispatcher(eventName string) *eventx.Dispatcher {
	nodeEventName := "node_" + eventName
	return eventx.GetEvent(nodeEventName)
}

// PublishEvent 通过消息队列发布事件消息同步到各个节点
// 可通过 eventx.GetEvent(eventName) 方法监听处理事件
func (n *Node) PublishEvent(eventName string, params map[string]interface{}) error {
	producer, err := n.getProducer()
	if err != nil {
		return err
	}
	defer producer.Close()

	bs, err := json.Marshal(params)
	if err != nil {
		err = errorx.Wrap(err, fmt.Sprintf("json.Marshal"))
		return err
	}
	nodeEventName := "node_" + eventName
	err = producer.Publish(nodeEventName, bs)
	if err != nil {
		err = errorx.Wrap(err, fmt.Sprintf("Publish"))
		return nil
	}

	return nil
}

// GetProducer 获取消息队列的发布者
func (n *Node) getProducer() (*mq.Producer, error) {
	mqconfig := n.config.MsgQueue.(*NsqMsgQueue)
	if mqconfig.ProducerAddr != "" {
		producer, err := mq.GetProducer(mqconfig.ProducerAddr)
		if err != nil {
			return nil, errorx.Wrap(err, "GetProducer")
		}

		return producer, nil
	}
	return nil, errorx.New("MQProducerAddr not set")
}
